Skip to content

Conversation

@DerGuteMoritz
Copy link
Collaborator

@DerGuteMoritz DerGuteMoritz commented Oct 15, 2025

For *Async variants of CompletionStage methods, the (first) deferred argument was wrapped with onto so that its callbacks are invoked on the given executor. This results in a new deferred connected to the original one. In case of the *EitherAsync methods, that new deferred would then leak when alt chooses the other one but the onto deferred ends up in an error state since there was no way for the caller to attach an error handler to it.

Furthermore, for async variants of methods accepting two completion stages, f was never executed on the given executor. For example, then-combine was defined like this:

(defn- then-combine [d other ^BiFunction f]
  (assert-some other f)
  (fmap-deferred (zip d other)
                (fn [[x y]] (.apply f x y))))

Now then-combine-async (defined via def-async-for-dual) would wrap d with (onto d executor) before calling then-combine. However, the deferred passed to fmap-deferred was the one returned from (zip d other) which would not have the given executor attached. Thus, f would have been executed on a different executor than desired.

This patch addresses both issues by adding a to deferred as the final argument to every method implementation. This deferred is bound to the desired executor. The operation f is then attached as a callback to that to deferred and the deferred of the main operation is connected to it. For synchronous implementations, nil can be passed as to in which case the callback is attached directly to the main operation deferred to save some overhead. This means that now the original deferred is passed to alt so that all error states can be handled by the caller.

It also removes the def-async-for and def-async-for-dual macros and opts to instead always explicitly pass the to deferred in all method implementations. This isn't much more verbose but hopefully serves to make the code bit easier to follow.

Finally, it renames fmap-deferred to shallow-chain for consistency with the established chain naming and introduces the analogous shallow-connect (also private for now).

Prompted by these test failures.

Based on #245

TODO

  • Update commit message to match latest refactoring (executor instead of to deferred being passed to method implementations)

@DerGuteMoritz
Copy link
Collaborator Author

@cosineblast Since you're the original author of the CompletionStage implementation, I figured maybe you could review this one? Happy to answer any questions 🙏

@DerGuteMoritz DerGuteMoritz force-pushed the fix-dropped-errors-in-completion-stage-impl branch 2 times, most recently from 4256055 to 7980c99 Compare October 16, 2025 08:44
@cosineblast cosineblast self-assigned this Oct 19, 2025
(toCompletableFuture [d]
(to-completable-future d))
(then-exceptionally d f nil))
;; Only available since Java 12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some kind of conditional to add these so they won't get forgotten?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a way to implement methods conditionally depending on the Java version. Are you aware of one? I mainly put these in for completion's sake (no pun intended 😄) so that we could uncomment them in the future when we decide to drop support for Java versions < 12 (which Clojure itself likely will do from 1.13 on).

(defn- then-compose [d ^Function f]
(defn- then-compose [d ^Function f result]
(assert-some f)
(let [d' (deferred)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still necessary? Shouldn't the provided target (result) be used instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit subtle indeed 😅 The purpose of d' is to unwrap the deferred returned from f (= fd), so AFAICT we still need it. If it helps: thenCompose is basically the bind / flatMap operation of the CompletionStage monad 😬

#(success! d' %)
#(error! d' %)))
(fn [error] (error! d' error)))
(-> (completion-stage-result d #(->deferred (.apply f %)) result)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this ->deferred be run on the default executor?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above - same applies here, of course.

Unlike `chain`, this function does not unwrap the result of `f`; it will only be applied to the
immediate value of `d`. This is for mimicking `CompletionStage`'s behavior."
[d f]
(let [d' (deferred)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this deferred run on the default executor regardless of where d is getting executed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the executor of d' will be nil which means its callbacks will be invoked on the same executor / thread which realizes it. So basically, d' inherits the executor of d here. This behavior isn't documented very well, alas. The best I could find is this paragraph:

In these cases, we can move the stream or deferred `onto` an executor, guaranteeing that all actions resulting from an operation will be enqueued onto a thread pool rather than immediately executed. This executor can be generated via `manifold.executor/instrumented-executor`, or using the convenience methods `fixed-thread-executor` and `utilization-executor`. In addition to providing automatic instrumentation, these executors will guarantee that any streams or deferred created within their scope will also be "on" that executor. For this reason, it's often sufficient to only call `onto` on a single stream in a topology, as everything downstream of it will transitively be executed on the executor.

The implementation lives here:

(clojure.core/loop []
(when-let [^IDeferredListener l# (.poll ~'listeners)]
(try
(if (nil? ~executor)
(~(if success? `.onSuccess `.onError) ^IDeferredListener l# ~val)
(.execute ~(with-meta executor {:tag "java.util.concurrent.Executor"})
(fn []
(try
(~(if success? `.onSuccess `.onError) l# ~val)
(catch Throwable e#
#_(.printStackTrace e#)
(log/error e# "error in deferred handler"))))))
(catch Throwable e#
#_(.printStackTrace e#)
(log/error e# "error in deferred handler")))
(recur)))

And finally, here's an example which empirically confirms this behavior:

(let [ex (ex/fixed-thread-executor
          1
          {:thread-factory (ex/thread-factory
                            (constantly "my executor")
                            (deliver (promise) nil))})
      d (deferred ex)
      d' (shallow-chain d #(prn :one % (.getName (Thread/currentThread))))]
  (chain d' #(prn :two % (.getName (Thread/currentThread))))
  (success! d :ok))

Output:

:one :ok "my executor"
:two nil "my executor"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every day i learn something new

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here! 😄

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very well pointed! I should probably have documented this detail with a comment in the original implementation, but I'm glad you've written it out explicitly

@DerGuteMoritz DerGuteMoritz force-pushed the fix-dropped-errors-in-completion-stage-impl branch from 7980c99 to b31b83a Compare October 22, 2025 14:21
@DerGuteMoritz
Copy link
Collaborator Author

Thanks for your review and approval @valerauko! Let's also wait for @cosineblast's approval before merging.

For `*Async` variants of `CompletionStage` methods, the (first) deferred argument was wrapped with
`onto` so that its callbacks are invoked on the given `executor`. This results in a new deferred
connected to the original one. In case of the `*EitherAsync` methods, that new deferred would then
leak when `alt` chooses the `other` one but the `onto` deferred ends up in an error state since
there was no way for the caller to attach an error handler to it.

Furthermore, for async variants of methods accepting two completion stages, `f` was never executed
on the given executor. For example, `then-combine` was defined like this:

    (defn- then-combine [d other ^BiFunction f]
      (assert-some other f)
      (fmap-deferred (zip d other)
                    (fn [[x y]] (.apply f x y))))

Now `then-combine-async` (defined via `def-async-for-dual`) would wrap `d` with `(onto d executor)`
before calling `then-combine`. However, the deferred passed to `fmap-deferred` was the one returned
from `(zip d other)` which would not have the given executor attached. Thus, `f` would have been
executed on a different executor than desired.

This patch addresses both issues by adding a `to` deferred as the final argument to every method
implementation. This deferred is bound to the desired `executor`. The operation `f` is then attached
as a callback to that `to` deferred and the deferred of the main operation is connected to it. For
synchronous implementations, `nil` can be passed as `to` in which case the callback is attached
directly to the main operation deferred to save some overhead. This means that now the original
deferred is passed to `alt` so that all error states can be handled by the caller.

It also removes the `def-async-for` and `def-async-for-dual` macros and opts to instead always
explicitly pass the `to` deferred in all method implementations. This isn't much more verbose but
hopefully serves to make the code bit easier to follow.

Finally, it renames `fmap-deferred` to `shallow-chain` for consistency with the established `chain`
naming and introduces the analogous `shallow-connect` (also private for now).
@DerGuteMoritz DerGuteMoritz force-pushed the fix-dropped-errors-in-completion-stage-impl branch from b31b83a to 1a2fc56 Compare October 23, 2025 15:56
(defn- then-apply [d ^Function f]
(assert-some f)
(fmap-deferred d #(.apply f %)))
(defn- completion-stage-result [d f to]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I'm all in for dropping the haskell-like fmap terminology and going for more familiar manifold terms (I loved your decision with shallow-chain), but can't we name this completion-stage-result function to something more meaningful/specifc?
Perhaps shallow-chain-with or shallow-chain-into? I don't think it deserves a documentation string since its implementation is very simple and shallow-chain is well explained (thanks for re-wording its docs btw), but I think this code would benefit from a better name for this, since it's used throughout the implementation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I'm all in for dropping the haskell-like fmap terminology and going for more familiar manifold terms (I loved your decision with shallow-chain),

Thanks, glad you like it, too 😊 Always feels a bit intrusive to rename functions like that but harmonizing it with the existing manifold terminology seemed prudent.

but can't we name this completion-stage-result function to something more meaningful/specifc?

Ah yes, thanks for noticing! I'm not too happy with that name myself but forgot to mention it in the description.

Perhaps shallow-chain-with or shallow-chain-into? I don't think it deserves a documentation string since its implementation is very simple and shallow-chain is well explained (thanks for re-wording its docs btw), but I think this code would benefit from a better name for this, since it's used throughout the implementation.

Good ideas. Perhaps even shallow-chain-onto to bring the onto back in? Looking at it now, we could actually change the signature to accept an executor instead of a deferred, making the onto name even more fitting. Originally, I was passing in a deferred even in the synchronous case but as you can see, I have since refactored it so that we're passing nil instead now. Will give this a try!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed 9fb23ad as a fixup commit (will squash before merge later). I realized that I could further decompose it so that we now also have shallow-onto 😄 WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is much better, lgtm

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm much happier with this now, too. Are you still reviewing? Otherwise, would you be so kind to hit "approve" on the PR for posterity? 🙏

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot!

(-> (completion-stage-result d #(->deferred (.apply f %)) to)
(on-realized (fn [fd]
(shallow-connect fd d'))
(fn [error]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I loved the join . fmap implementation for then-compose using shallow-connect here, well done!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks 🙏 Though I have to admit that I'm not fluent in Monadic, so I am not familiar with the join . fmap idiom. The code here is just the nicest way I could come up with to express this operation 😅

@cosineblast cosineblast merged commit 41fc83f into detect-dropped-errors-in-tests Oct 28, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants